Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP co-assign related root-ish tasks #4899

Closed

Conversation

gjoseph92
Copy link
Collaborator

In decide_worker, rather than spreading out root tasks as much as possible, schedule consecutive (by priority order) root(ish) tasks on the same worker. This ensures the dependencies of a reduction start out on the same worker, reducing future data transfer.

sum(map(len, group._dependencies)) < 5 # TODO what number
)
):
group._last_worker_tasks_left -= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think that having a counter like this opens us up to failures in some complex situations.

For example, what happens when we lose a few workers and need to recompute these tasks and this count goes negative?

I would encourage us to keep thinking about comparing occupancy between the best and recent workers. That feels like a strictly local decision that might be more robust.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is that duration_average is unknown that's fine, we should replace it with whatever is used to increment the worker's occupancy when the previous tasks were added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this will probably get thrown off by both new workers arriving and workers leaving. I'll think more about how to use something less stateful.


total_nthreads = sum(
wws._nthreads for wws in candidates
) # TODO get `self._total_threads` from scheduler? Though that doesn't account for worker restrictions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Common case, if len(candidates) < len(self.workers) ... else self._total_threads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. More that this function isn't a method of SchedulerState, so we'll have to pass in self._total_threads to here (fine, just slightly awkward).

if (
ws is not None # there is a previous worker
and group._last_worker_tasks_left > 0 # previous worker not fully assigned
and ts._dependents # task has dependents
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend skipping the dependents check. This can change in the future. Consider the following case:

df = dd.read_parquet(...).persist()

df.x.mean()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool (and the first non-performance-related advantage I see to this approach over finding relatives in the graph structure). So neighboring partitions of df would be stored on the same workers, before we even know what we're going to do with them.

Comment on lines 7526 to 7600
and ( # is a root-like task (task group depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that only groups of size N (=5) are grouped together and everything above is distributed randomly?

If my understanding here is correct, we might want to choose 32 (+/-1?) since this is the default branching factor for tree reductions in dask, last time I checked

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is restricing this behavior to task groups where the size of the task group is large, but the number of dependencies of that group is small. This is true in situations like the following:

df = dd.read_parquet(...)  # thousands of tasks, but zero dependenies

x = da.from_zarr(...)  # thousands of tasks, but one dependency (the zarr file)

But not in situations like the following:

y = x + 1  # thousands of tasks, but thousands of dependencies

@gjoseph92
Copy link
Collaborator Author

@mrocklin here's the xarray/zarr issue I'm running into.

Running this code on my branch behaves the same as on main. Note that this is the exact same array-sum test as #4864, just on a zarr opened in xarray. I haven't tried taking xarray out of the equation yet and using plain zarr. This is exactly the sort of case we hoped to make an exception for as you described in #4899 (comment).
mydask.pdf.

What I'm struggling to debug is that (from adding print statements to decide_worker; see latest commit), decide_worker doesn't even seem to be called on the sum- root tasks, so we have no chance to smartly schedule them. I don't understand how this can happen.

# test_zarr.py
import xarray as xr
import dask.array as da
from distributed import Client, LocalCluster


if __name__ == "__main__":
    cluster = LocalCluster(
        processes=True, n_workers=4, threads_per_worker=1, memory_limit=0
    )
    client = Client(cluster)

    # Write a zarr array to disk (requires 100GB free disk space!)
    # Comment this out once you've run it once.
    data = da.zeros((12500000, 1000), chunks=(12500000, 1))
    ds = xr.Dataset({"data": (("x", "y"), data)})
    ds.to_zarr("test.zarr")
    print("Saved zarr")

    # Do the same array-sum example, but from zarr.
    # This _should_ behave just as well, but on the dashboard we can see lots of transfers.
    # In fact, the dashboard looks the same as running this test on main.
    ds_zarr = xr.open_zarr("test.zarr")
    ds_zarr.sum("y").data.data.compute()
    # TODO where is `sum-` getting scheduled?? Doesn't seem to be going through `decide_worker`

Here's the full (optimized) graph with order coloring. You can see that it looks just like the non-zarr one, except every sum task depends on the one zarr array task: mydask.pdf

Screen Shot 2021-06-11 at 11 42 29 AM (2)

Unfortunately `ts._prefix._duration_average` == -1 for all the root tasks we care about, so this won't work.
When a task is root-like and the previous worker is full, we don't want to use the normal `decide_worker` logic, since that only considers as candidates workers that have the deps of the dask. Since the task only has 1-5 deps, we'd only ever consider the same 1-5 workers.
ws: WorkerState = group._last_worker

if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in valid_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This walks through all workers for all tasks. We may not be able to do this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below; I believe valid_workers is None is the common case? Agreed that this isn't ideal though. But if there are worker restrictions, ignoring them and just using self._total_nthreads could be wildly wrong (imagine 10 GPU workers and 100 CPU workers for a task group of 50 that needs to run on GPUs). Maybe there's a cheaper measurement?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, grand.

if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in valid_workers)

group_tasks_per_worker = len(group) / total_nthreads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if valid_workers is None? It looks like total_nthreads might be undefined

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we use the self._total_nthreads that SchedulerState.decide_worker passed in (see https://github.com/dask/distributed/pull/4899/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R2376). So in the common case of no worker restrictions, we won't have to do the total_nthreads calculation.


group._last_worker = ws
group._last_worker_tasks_left = math.floor(group_tasks_per_worker)
group._last_worker_priority = ts.priority
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do this when we're not in a root-ish task situation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we're also using this as our base case when group._last_worker is not set. So for the first root-ish task, we fall through to here, and then need to store our decision for the next decide_worker cycle. We could skip doing this if we've identified that the task is not root-like (which would save us from ever checking that again), though it might make the code a little more tangled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the first root-ish task, we fall through to here

We can maybe guard this code block with a condition that we're root-ish then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes—though we currently skip determining if we're root-ish when last_worker is None. So we'd need some other sentinel value to distinguish "no last worker because we haven't checked yet if it's root-ish" from "no last worker because it's not root-ish".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently check the following:

        and group_tasks_per_worker > 1  # group is larger than cluster
        and (  # is a root-like task (task group is large, but depends on very few tasks)
            sum(map(len, group._dependencies)) < 5  # TODO what number
        )

I recommend that we define that as a variable at the top and then use it in a couple of places. Would that work?

Only compute `total_nthreads` when a new worker is needed, and only compute the number of dependencies once per task group. Overloads the meaning of `_last_worker` to indicate if we've decided in the past whether a TaskGroup is root-ish or not.
@mrocklin
Copy link
Member

Alternate solution that uses occupancy here: #4922

In the above case, we want ``a`` and ``b`` to run on the same worker,
and ``c`` and ``d`` to run on the same worker, reducing future
data transfer. We can also ignore the location of ``X``, because
as a common dependency, it will eventually get transferred everywhere.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 the ascii art

Comment/question: Do we want to explain all of this here? Historically I haven't put the logic behind heuristics in the code. This is a subjective opinion, and far from universal, but I find that heavily commented/documented logic makes it harder to understand the code at a glance. I really like that the current decide_worker implementation fits in a terminal window. I think that single-line comments are cool, but that long multi-line comments would better be written as documentation.

Thoughts? If you are not in disagreement then I would encourage us to write up a small docpage or maybe a blogpost and then link to that external resource from the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also planning on updating https://distributed.dask.org/en/latest/scheduling-policies.html#choosing-workers, probably with this same ascii art. So just linking to that page in the docstring seems appropriate.

This reverts commit 0d1e2380b525b0ee7b4e60d9bee62f889ed5520b.
Maybe avoiding -1 is excessive; I just wanted it to still work if we changed to a Py_ssize_t
Arguably would be better to check that `ws` is in (valid_workers or all_workers); downside is that the common `all_workers` would require an O(n) search, since it's only `dict_values`, not `dict`.
@gjoseph92
Copy link
Collaborator Author

Closing in favor of #4967, which is similar logic but more concise. It also removes _last_worker_tasks_left and _last_worker_priority, and puts the root-ish checks in SchedulerState.decide_worker, where they're a little easier to do.

@gjoseph92 gjoseph92 closed this Jun 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Co-assign neighboring tasks to neighboring workers
3 participants